package com.apps.sdses.flink141.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author jiwei
 * @description
 * @date 2023/6/14 22:51
 */
public class POI2MySQL {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建 POI 数据表
        tableEnv.executeSql("CREATE TABLE PoiStaticData ("
                + "id INT,"
                + "longitude FLOAT,"
                + "latitude FLOAT,"
                + "name STRING,"
                + "type STRING,"
                + "description STRING,"
                + "event_time TIMESTAMP(3)"
                + ") WITH ("
                + "'connector' = 'datagen',"
                + "'number-of-rows' = '822579', "
                //一次生成100万条数据"'rows-per-second' = '1',"
                + "'fields.id.min' = '1',"
                + "'fields.id.max' = '100',"
                + "'fields.longitude.min' = '116.21',"   //中国经度范围：73.66~135.05；济南经度范围：116.21~117.54
                + "'fields.longitude.max' = '117.54',"
                + "'fields.latitude.min' = '36.08',"     //中国维度范围：3.86~53.55；济南维度范围：36.08~37.23
                + "'fields.latitude.max' = '37.23',"
                + "'fields.name.length' = '10',"
                + "'fields.type.length' = '5',"
                + "'fields.description.length' = '20'"
                + ")");


//        tableEnv.executeSql("CREATE TABLE sinkTable WITH (\n" +
//                "  'connector' = 'kafka',\n" +
//                "  'topic' = 'poi_static_data',\n" +
//                "  'properties.bootstrap.servers' = '192.168.102.154:29092',\n" +
//                "  'format' = 'json'\n" +               //json格式序列化:{"id":1,"sid":"11baac5a","ts":"2023-06-12 15:09:22.849","flag":true,"vc":31}
//                ") LIKE PoiStaticData (EXCLUDING ALL)");

//        // 将数据写入 MySQL 数据库
        tableEnv.executeSql("CREATE TABLE sinkTable ("
                + "id INT,"
                + "longitude FLOAT,"
                + "latitude FLOAT,"
                + "name STRING,"
                + "type STRING,"
                + "description STRING,"
                + "event_time TIMESTAMP(3)"
                + ") WITH ("


//                + "'connector' = 'print'"
                + "'connector' = 'jdbc',"
                + "'sink.parallelism' = '10',"
                + "'url' = 'jdbc:mysql://192.168.102.155:33066/db_datacube_test',"
                + "'table-name' = 'poi_static_data',"
                + "'username' = 'datacube_test',"
                + "'password' = 'Gbd#XERA@19d1ZtD'"
//                + "'url' = 'jdbc:mysql://localhost:3306/test',"
//                + "'table-name' = 'poi_static_data',"
//                + "'username' = 'root',"
//                + "'password' = 'root'"
                + ")");
        tableEnv.executeSql("INSERT INTO sinkTable SELECT * FROM PoiStaticData");




    }
}
